package com.bfxy.paya.service.impl;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.bfxy.paya.entity.CustomerAccount;
import com.bfxy.paya.mapper.CustomerAccountMapper;
import com.bfxy.paya.service.PayService;
import com.bfxy.paya.service.producer.CallbackService;
import com.bfxy.paya.service.producer.TransactionProducer;
import com.bfxy.paya.utils.FastJsonConvertUtil;

@Service
public class PayServiceImpl implements PayService {

    //topic主题
    public static final String TX_PAY_TOPIC = "tx_pay_topic";

    //tags
    public static final String TX_PAY_TAGS = "pay";

    //用戶的Mapper，對他的金額進行一個操作
    @Autowired
    private CustomerAccountMapper customerAccountMapper;

    //用於封裝生產者消息，並發送。主要是做一個分佈式事務消息的投遞
    @Autowired
    private TransactionProducer transactionProducer;

    //給訂單（order）那邊發確認消息，告訴訂單Order系統
    @Autowired
    private CallbackService callbackService;

    @Override
    public String payment(String userId, String orderId, String accountId, double money) {
        String paymentRet = "";//返回的結果，成功還是失敗
        try {
            //	最开始有一步 token验证操作（重复提单问题）

            BigDecimal payMoney = new BigDecimal(money);//支付金額
            //高並發的情況下
            // 1、首先要判斷一下賬戶有沒有這麼多錢，
            // 2、

            //加锁开始（获取）

            //查詢customer對象，看看錢夠不夠（獲取一些歷史的關鍵信息）
            CustomerAccount old = customerAccountMapper.selectByPrimaryKey(accountId);
            BigDecimal currentBalance = old.getCurrentBalance();//當前餘額
            int currentVersion = old.getVersion();//版本號，這裡是為了變更版本，不是為了樂觀鎖。
            //	要对大概率事件进行提前预判（小概率事件我们做放过,但是最后保障数据的一致性即可），比如同一时间一个账号同时两个人操作购买东西，如何保证一致性？
            //业务出发:
            //当前一个用户账户 只允许一个线程（一个应用端访问）
            //技术出发：
            //1 redis去重 分布式锁
            //2 数据库乐观锁去重
            //	做扣款操作的时候：获得分布式锁，看一下能否获得。減法操作，看看錢夠不夠，並且獲取到新的餘額。
            BigDecimal newBalance = currentBalance.subtract(payMoney);

            //加锁结束（释放）

            if (newBalance.doubleValue() > 0) {    //	或者一种情况获取锁失败
                //  以下兩步並行操作，所以序號都用1
                //	1.组装消息 --> 推送到payb服務中，就是發送到MQ中去（投遞消息到payb系統，讓他進行加錢操作）
                //  1.執行本地的事物，在TransactionListenerImpl中處理的

                //第一步
                String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();//消息唯一的ID：UUID+$+當前時間
                Map<String, Object> params = new HashMap<>();//要投遞（递）的消息
                params.put("userId", userId);//用戶ID
                params.put("orderId", orderId);//訂單ID
                params.put("accountId", accountId);//賬號
                params.put("money", money);//100，餘額。如果嚴謹一點應該把newBalance和money都傳進去再做一次判斷或者用版本號進行判斷，在並發的情況下可能出現問題，但幾率不高

                //組裝消息，Message(String topic, String tags, String keys, byte[] body)；payb平台要去監聽（监听）這個topic，FastJsonConvertUtil：序列化的工具類
                Message message = new Message(TX_PAY_TOPIC, TX_PAY_TAGS,
                        keys,
                        FastJsonConvertUtil.convertObjectToJSON(params).getBytes());

                // 發消息就得用到我們的事物消息，要開始進行封裝我們的RocketMQ的生產者了
                // 可能需要用到的参数，這些參數不會發送到MQ中去，但是本地事務中可能用的到
                params.put("payMoney", payMoney);
                params.put("newBalance", newBalance);
                params.put("currentVersion", currentVersion);

                // 同步阻塞。不加這個不知道本地事物成沒成功，因為本地事物和發消息是並行的。在本類中發消息在TransactionListenerImpl類中執行本地事物
                CountDownLatch countDownLatch = new CountDownLatch(1);
                params.put("currentCountDown", countDownLatch);//加上這個參數，用來做同步阻塞
                // 消息发送（發送到MQ） 并且 本地的事务执行（在TransactionListenerImpl中實現）
                TransactionSendResult sendResult = transactionProducer.sendMessage(message, params);

                // 發完消息之後先不去做下面的判斷，因為此時還不知道本地事物有沒有執行完成。先等一等本地事物，這就是同步阻塞。本地事物成功之後再放行
                countDownLatch.await();

                // 消息發送成功 + 本地事物成功 這兩個條件都成功情況下才可以認為 支付成功了。
                // 發消息和本地事物是並行的，在這裡消息發送成功了本地事物不一定執行完成，所以要做一個同步阻塞，在94行。
                // TODO 这里有一个问题：假如发送消息成功了并且本地事物成功了，但是payb中执行失败了会怎么办呢？
                //  我的理解：顾客已经购买完成商品了，钱也减掉了，失败的部分是商家，不应该因为这个而影响顾客的购物，这里商家自行解决就可以了。
                if (sendResult.getSendStatus() == SendStatus.SEND_OK
                        && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
                    /**
                     * 回调order通知支付成功消息。變更訂單信息，告訴訂單係統，我支付成功了你去派物流發件把
                     * 這裡為什麼做MQ解耦呢，沒必要做同步調用。
                     * 因為要做同步調用的話支付成功了難道還要跟訂單說一聲支付成功讓訂單給支付A返回一個結果嗎？
                     * 只需要保障訂單能收到這條消息就可以了
                     * 通過MQ就可以做到不同系統之間的消息傳遞（传递）和數據的解耦
                     */
                    callbackService.sendOKMessage(orderId, userId);
                    paymentRet = "支付成功!";
                } else {
                    paymentRet = "支付失败!";
                }
            } else {
                paymentRet = "余额不足!";
            }
        } catch (Exception e) {
            e.printStackTrace();
            paymentRet = "支付失败!";
        }
        return paymentRet;
    }

}
